1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package storm.starter.bolt;
19
20 import backtype.storm.Config;
21 import backtype.storm.topology.BasicOutputCollector;
22 import backtype.storm.topology.OutputFieldsDeclarer;
23 import backtype.storm.tuple.Fields;
24 import backtype.storm.tuple.Tuple;
25 import backtype.storm.tuple.Values;
26 import org.testng.annotations.DataProvider;
27 import org.testng.annotations.Test;
28 import storm.starter.tools.MockTupleHelpers;
29 import storm.starter.tools.Rankings;
30
31 import java.util.Map;
32
33 import static org.fest.assertions.api.Assertions.assertThat;
34 import static org.mockito.Matchers.any;
35 import static org.mockito.Mockito.*;
36
37 public class TotalRankingsBoltTest {
38
39 private static final String ANY_NON_SYSTEM_COMPONENT_ID = "irrelevant_component_id";
40 private static final String ANY_NON_SYSTEM_STREAM_ID = "irrelevant_stream_id";
41 private static final Object ANY_OBJECT = new Object();
42 private static final int ANY_TOPN = 10;
43 private static final long ANY_COUNT = 42;
44
45 private Tuple mockRankingsTuple(Object obj, long count) {
46 Tuple tuple = MockTupleHelpers.mockTuple(ANY_NON_SYSTEM_COMPONENT_ID, ANY_NON_SYSTEM_STREAM_ID);
47 Rankings rankings = mock(Rankings.class);
48 when(tuple.getValue(0)).thenReturn(rankings);
49 return tuple;
50 }
51
52 @DataProvider
53 public Object[][] illegalTopN() {
54 return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
55 }
56
57 @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalTopN")
58 public void negativeOrZeroTopNShouldThrowIAE(int topN) {
59 new TotalRankingsBolt(topN);
60 }
61
62 @DataProvider
63 public Object[][] illegalEmitFrequency() {
64 return new Object[][]{ { -10 }, { -3 }, { -2 }, { -1 }, { 0 } };
65 }
66
67 @Test(expectedExceptions = IllegalArgumentException.class, dataProvider = "illegalEmitFrequency")
68 public void negativeOrZeroEmitFrequencyShouldThrowIAE(int emitFrequencyInSeconds) {
69 new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
70 }
71
72 @DataProvider
73 public Object[][] legalTopN() {
74 return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
75 }
76
77 @Test(dataProvider = "legalTopN")
78 public void positiveTopNShouldBeOk(int topN) {
79 new TotalRankingsBolt(topN);
80 }
81
82 @DataProvider
83 public Object[][] legalEmitFrequency() {
84 return new Object[][]{ { 1 }, { 2 }, { 3 }, { 20 } };
85 }
86
87 @Test(dataProvider = "legalEmitFrequency")
88 public void positiveEmitFrequencyShouldBeOk(int emitFrequencyInSeconds) {
89 new TotalRankingsBolt(ANY_TOPN, emitFrequencyInSeconds);
90 }
91
92 @Test
93 public void shouldEmitSomethingIfTickTupleIsReceived() {
94
95 Tuple tickTuple = MockTupleHelpers.mockTickTuple();
96 BasicOutputCollector collector = mock(BasicOutputCollector.class);
97 TotalRankingsBolt bolt = new TotalRankingsBolt();
98
99
100 bolt.execute(tickTuple, collector);
101
102
103
104 verify(collector).emit(any(Values.class));
105 }
106
107 @Test
108 public void shouldEmitNothingIfNormalTupleIsReceived() {
109
110 Tuple normalTuple = mockRankingsTuple(ANY_OBJECT, ANY_COUNT);
111 BasicOutputCollector collector = mock(BasicOutputCollector.class);
112 TotalRankingsBolt bolt = new TotalRankingsBolt();
113
114
115 bolt.execute(normalTuple, collector);
116
117
118 verifyZeroInteractions(collector);
119 }
120
121 @Test
122 public void shouldDeclareOutputFields() {
123
124 OutputFieldsDeclarer declarer = mock(OutputFieldsDeclarer.class);
125 TotalRankingsBolt bolt = new TotalRankingsBolt();
126
127
128 bolt.declareOutputFields(declarer);
129
130
131 verify(declarer, times(1)).declare(any(Fields.class));
132 }
133
134 @Test
135 public void shouldSetTickTupleFrequencyInComponentConfigurationToNonZeroValue() {
136
137 TotalRankingsBolt bolt = new TotalRankingsBolt();
138
139
140 Map<String, Object> componentConfig = bolt.getComponentConfiguration();
141
142
143 assertThat(componentConfig).containsKey(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
144 Integer emitFrequencyInSeconds = (Integer) componentConfig.get(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS);
145 assertThat(emitFrequencyInSeconds).isGreaterThan(0);
146 }
147 }